
from threading import Thread, RLock, Lock
import threading
import string
import sys
import random 
import time
import os
import sys

from ma.commons.core.taskinfo import *
from ma.commons.core.constants import * 
from ma.commons.core.nodetrackerstatus import *
from ma.utils.timerclass import *

from ma.commons.core.newmaptaskaction import *
from ma.commons.core.maptipinfo import *
from ma.commons.core.reducetipinfo import *
from ma.mrimprov.commons.maptip import *
from ma.mrimprov.commons.reducetip import *
from ma.commons.core.expireunresponsivetasks import *
from ma.commons.core.completedtaskinfo import *
from ma.commons.core.failedtaskinfo import *
from .master import *
import ma.commons.core.outputserver as outputserver

import logging
import logging.config
import ma.const
import ma.log
from ma.fs.dfs._hdfs import HDFS
from ma.stats.statster import Statster 
import ma.net.netutils



class NodeTracker(Thread):
	"""This class will manage the functions of a single node on the cluster, 
	all its communication with the JobsTracker will be initiated by this node. 
	"""
	
	
	def __init__(self):
		"""all ctor work will be done in func initialize(). 
		The NodeTracker houses three task dictionaries below, each is meant to have a key 
		pointing to another dictionary, the key being the job_id and the value being 
		another dictionary with TIP_id as key and the TaskinProgress object as value 
		"""
		
		# initializing the logger
		self.__log = ma.log.get_logger('ma.mrimprov')
		
		# xml client connect code
		import xmlrpc.client
		master_ip = ma.const.XmlData.get_str_data(ma.const.xml_master_ip)
		master_port = ma.const.XmlData.get_int_data(ma.const.xml_master_port)
		connection_str = 'http://'+ master_ip +':'+ str(master_port)
		self.__log.debug("Connection string for RPC to Master %s", connection_str)
		self.master = xmlrpc.client.Server(connection_str)
		
		# list of jobs this NT has encountered so far		
		self.jobs = []
		
		# This HDFS reference is for fetching or writing input, output or any
		#  intermediary files (Note: not for interacting with the DFS flags					
		host = ma.const.XmlData.get_str_data(ma.const.xml_hdfs_host)
		port = ma.const.XmlData.get_int_data(ma.const.xml_hdfs_port)
		self.hdfs  = HDFS(host, port)
		
		# start output server (data shuffler)
		self.output_server = outputserver.OutputServer(self)
		
		Thread.__init__(self)
		
		# a dict of running tasks task_id -> TIP. This and the subsequent three 
		# sets of dicts are a dict embedded in a dictionary. First is having a 
		#  job_id as key and value is a dict with Task_id as key and either a 
		#  MapTIP or ReduceTIP as values 
		self.running_map_tasks = {}
		self.running_reduce_tasks = {}
		
		# a double dict of tasks task_id -> TIP that have completed.
		self.complete_map_tasks = {}
		self.complete_reduce_tasks = {}
		
		# a double dict of tasks task_id -> TIP that have failed or failed to respond
		self.failed_map_tasks = {}
		self.failed_reduce_tasks = {}
				
		self.threads = []
		
		# if a task does not send a health ping for this interval it must have 
		#  expired
		self.taskexpiry_interval = ma.const.XmlData.get_float_data(ma.const.xml_nt_task_expiry_interval)
		
		# get tracker id from the master
		interface_name = ma.const.XmlData.get_str_data(ma.const.xml_network_interface)
		self.__internal_ip = ma.net.netutils.get_ip_address(interface_name)
		tracker_id = self.master.registerNT(self.__internal_ip)
		
		self.__log.info('Starting up the MR+ Nodetracker %s (%s)', str(tracker_id), self.__internal_ip)
		
		# a NodeTrackerStatus object built up to be sent to the JobsTracker along
		#  with every heartbeat
		self.tacker_status = NodeTrackerStatus(tracker_id)
		
		# the internal between heartbeats; sent to the JobTracker
		self.heartbeat_interval = ma.const.XmlData.get_float_data(ma.const.xml_nt_master_heartbeat_interval)
		
		# a dict that keeps track of health ping from tasks running at this NodeTracker
		#  this too is a dict within a dict just as running tasks dict
		self.tasks_last_ping = {}
		
		# timer class that generates or calls a periodic function for this class like the heartbeat call to the master
		self.timer = TimerClass(ma.const.XmlData.get_float_data(ma.const.xml_check_on_timers_interval))
		self.timer.start()
		self.threads.append(self.timer)
		
		# the current task capacity on this node_tracker
		self.task_vacancy = ma.const.XmlData.get_int_data(ma.const.xml_max_no_tasks)
		
		# for logging stats
		self.statster = Statster(self)
		
		self.initialize()
				
		# a list of info objects that is built up from one heartbeat to the next
		#  and passed along every heartbeat call to the HDFSInteracter
		self.info_objects_list = []
		
		# an actions object list that is sent from the HDFSInteractor 
		#  dictating the actions to be taken by the NT in response to its heartbeat call 
		self.actions = []
		
		# self.expiredeadtasks = ExpireUnresponsiveTasks(self, ma.const.XmlData.get_float_data(ma.const.xml_check_task_expiry_interval))
		
		# this is a dict that houses completed maps per jobs job_id -> [list_of_maps completed] 
		self.completed_maps = {0:[]}
		
		# lock for accessing heartbeat
		self.__lock = Lock()
		
		
	def initialize(self):
		"""all ctor work will be done in this func so that the NodeTracker may be 
		re-initializes whenever it comes up again after failure 
		"""
		
		# reinitialize dict of running tasks task_id -> TIP
		self.running_map_tasks.clear()
		self.running_reduce_tasks.clear()
		
		# reinitialize dict of complete tasks task_id -> TIP
		self.complete_map_tasks.clear()
		self.complete_reduce_tasks.clear()
		
		# reinitialize dict of failed tasks task_id -> TIP
		self.failed_map_tasks.clear()
		self.failed_reduce_tasks.clear()
		
		# reinitialize dict of tasks ping times 
		self.tasks_last_ping.clear()
		
		# the current task capacity on this node_tracker
		self.task_vacancy = ma.const.XmlData.get_int_data(ma.const.xml_max_no_tasks)
		
		self.__log.info('(Re)Initialized MR+ Nodetracker')
		
		
	def run(self):		
		"""this func is the main service loop and will run forever 
		"""
		
		self.__log.info('Starting up the MR+ NT\'s Heartbeat')
			
		# the timer for heart beat initialized
		self.hrtbeattimer = self.timer.addTimer(self.heartbeat_interval, self.heartbeat,[])
		
		self.__log.warning('Take out the while loop in the Run function')
		
		while True:
			time.sleep(5)
		
			
	def heartbeat(self):
		"""this func will send a TaskInfo objects list to the HDFSCommunicator
		periodically and retrieve an actions list from this call  
		"""
				
		# TODO: CODE for talking to HDFS according to info objects in NTStatus object
		#1) check task capacity on NT and pick up new map or reduce tasks from the
		#HDFS after consulting the NT scheduler
		#2)Mark failed tasks as failed
		#3) mark completed tasks as complete
		
		try:
			with self.__lock:
				# marshalling info objects list to be sent to the master
				info_objects_str = pickle.dumps(self.info_objects_list)
			
				# heartbeat call to the master carries info objects list and returns actions object list and any completed jobs info
				actions_str, recently_completed_jobs = self.master.heartbeatFromNT(info_objects_str, self.task_vacancy, self.tacker_status.nodetracker_id)
			
				# unmarshalling the actions object list
				self.actions= pickle.loads(actions_str)
				
				# return and print all running processes
				running_procs = self.print_str_running_processes()
				self.__log.info("Currently Running processes: %s", running_procs)
				
				#self.__log.debug('Actions returned from heartbeat: %s', str(self.actions))
				
				for job_id in recently_completed_jobs:
					self.jobComplete(job_id)
					
				# work out a series of actions for the NT relevant to the action objects 
				self.processHeartbeatResponse(self.actions)
				
				# reinitialize the info objects list for the next heartbeat call
				self.info_objects_list = []
				
		except Exception as err_msg:
			self.__log.error("Error while sending RPC Message: RPC Failed %s", err_msg) 
			 
		
	def print_str_running_processes(self):
		"""This function prints to a string all the running process
		"""

		running_procs = ""
		
		# for running maps
		for job_id in self.running_map_tasks:
			for task_id in self.running_map_tasks[job_id]:
				if running_procs != "":
					running_procs += ", "
				running_procs += str(job_id) + "M" + str(task_id)
		
		# for running reduces
		for job_id in self.running_reduce_tasks:
			for task_id in self.running_reduce_tasks[job_id]:
				if running_procs != "":
					running_procs += ", "
				running_procs += str(job_id) + "R" + str(task_id)
		
		return running_procs
	
		
	def processHeartbeatResponse(self, actions):
		"""This function will iterate over the actions list sent by the Master
		and take appropriate action 
		"""
		
		for action in actions:
			
			action_type = action.action_type
			
			# if Action object is NewTaskAction
			if action_type == NEWMAPTASKACTION_TYPE:
				# launch the new map task
				self.__log.info('New map task action %s', str(action))
				self.startNewMapTask(action)
				
			# if Action object is NewReduceTaskAction
			elif action_type == NEWREDUCETASKACTION_TYPE:
				# create a new ReduceTIP
				self.__log.info('New reduce task action %s', str(action)) 
				self.startNewReduceTask(action)
			
			# Kill task now requires a map or reduce parameter
			#  if Action object is KillTaskAction
			elif action_type == KILLTASKACTION_TYPE:
				self.killTask(action.job_id, action.task_id)
			
			# if Action object is KillJobAction
			elif action_type == KILLJOBACTION_TYPE:
				self.killJob(action.job_id)
			
			# if Action object is ReinitNodeTrackerAction
			elif action_type == REINITNODETRACKERACTION_TYPE:
				self.initialize()
			
			# if Action object is DeadNodeTrackerAction
			elif action_type == DEADNODETRACKERACTION_TYPE:
				self.updateIPTranslationEntry(action.dead_node_id)
			
			# if Action object is CompleteJobAction
			elif action_type == COMPLETEJOBSACTION_TYPE:
				self.doHouskeepingForCompleteJobs(action.jobs_list)
		
			# if Action object is NewJobAction
			elif action_type == REFRESHJOBSACTION_TYPE:
				# add the job to the schedular's job list 
				self.task_scheduler.refreshJobsToDict(action.jobs_dict)
				
			
	def getCompleteTaskOutputPath(self, job_id, map_or_reduce, task_id, dest_task_id):	  
		"""this func will return the output path for a completed task's output
		if this NT computed that task id to completion. If not, then return 
		None
		"""
		
		if map_or_reduce == MAP:
			# if map
			if job_id in self.complete_map_tasks and task_id in self.complete_map_tasks[job_id]:
				# get the standard output filename
				self.output_dest_dir = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_output_temp_dir, job_id)
				output_filepath = self.output_dest_dir + os.sep + \
					ma.const.JobsXmlData.get_str_data(ma.const.xml_map_output_filename, job_id, task_id)
					
				# double check if the file exists
				if os.path.isfile(output_filepath):
					return output_filepath
				else:
					self.__log.error('Although the map task %d is complete but its output is not present at the standard path: %s', task_id, output_filepath)
					raise IOError('Missing output file for job id %d, map task %d', job_id, task_id)
		else:
			# if reduce
			if job_id in self.complete_reduce_tasks and task_id in self.complete_reduce_tasks[job_id]:
				# get the standard output filename
				self.output_dest_dir = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_output_temp_dir, job_id)
				output_filepath = self.output_dest_dir + os.sep + \
					ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_output_filename, job_id, task_id)
					
				# double check if the file exists
				if os.path.isfile(output_filepath):
					return output_filepath
				else:
					self.__log.error('Although the reduce task %d is complete but its output is not present at the standard path: %s', task_id, output_filepath)
					raise IOError('Missing output file for job id %d, reduce task %d', job_id, task_id)
				
		return None
		
		
	def doHouskeepingForCompleteJobs(self, jobs_list):
		"""this function will do housekeeping for complete jobs
		"""
		
		self.__log.error('Need to implement housekeeping for completed job')
		raise NotImplementedError('Complete Jobs housekeeping function yet to be implemented')

	
	def acquireTIPref(self, dict, job_id, task_id):		
		"""This function will return a TIP reference if given the dictionary and pointed 
		to the TIP... 
		"""
		
		if job_id in dict and task_id in (dict[job_id]):
			object, lock = (dict[job_id])[task_id]
			return object
		else:
			return None
		
		
	def acquireTIPLock(self, dict, job_id, task_id):
		"""this function will acquire the lock for a thread whenever called upon
		every tip in running_map_tasks dict and runing_reduce_tasks dict has a lock on it
		"""
			
		if job_id in dict and task_id in (dict[job_id]):
			object, lock = (dict[job_id])[task_id]
			lock.acquire(1)
			return True
		else:
			return False	  
	
	
	def releaseTIPLock(self, dict, job_id, task_id):
		"""this function will release the lock for a thread whenever called upon
		every tip in running_map_tasks dict and runing_reduce_tasks dict has a lock on it
		"""
		
		if job_id in dict and task_id in (dict[job_id]):
			object, lock = (dict[job_id])[task_id]
			lock.release()
			return True
		else:
			return False

		
	def updateIPTranslationEntry(self, dead_node_id):
		"""This function will update the IP Translation table in case of a 
		NodeTracker failure 
		"""
		
		self.__log.error('Need to implement IP Translation entry :: How it got called?')
		# self.__ip_trans_table.adjustTableDueDeadNode(dead_node_id)
	
	
	def killJob(self, job_id):
		"""This function kills a job and all its tasks running at this NodeTracker by removing all tasks
		from the running tasks list and the health ping list
		"""
		
		if job_id in self.running_map_tasks:
			# decrement the current task capacity on this node_tracker
			self.task_vacancy += len(self.running_map_tasks[job_id])
			
			# del all raise NotImplementedError('Update IP Translation table for dead nodes needs to be implemented') tasks of a job from the running tasks list
			del self.running_map_tasks[job_id]
		
		if job_id in self.running_reduce_tasks:
			# decrement the current task capacity on this node_tracker
			self.task_vacancy += len(self.running_reduce_tasks[job_id])
			
			# del all tasks of a job from the running tasks list
			del self.running_reduce_tasks[job_id]
			
		if job_id in self.tasks_last_ping:				
			# del all tasks from the last ping of tasks dict
			del self.tasks_last_ping[job_id]
		
		if job_id in self.complete_map_tasks:
			# del all tasks from the completed tasks list
			del self.complete_map_tasks[job_id]
			
		if job_id in self.complete_reduce_tasks:
			# del all tasks from the completed tasks list
			del self.complete_reduce_tasks[job_id]
		
		if job_id in self.failed_map_tasks:
			# del all tasks from the failed tasks dict
			del self.failed_map_tasks[job_id]
			
		if job_id in self.failed_reduce_tasks:
			# del all tasks from the failed tasks dict
			del self.failed_reduce_tasks[job_id]			 
			
	
	def killTask(self, job_id, map_or_reduce, task_id):
		"""This function kills a task by removing it from the running tasks list
		and the health ping list
		"""
		
		self.__log.info("Kill %s task %d request to NT for Job Id %d", map_or_reduce, task_id, job_id)
		
		# for maps
		if map_or_reduce == MAP:
			if job_id in self.running_map_tasks and task_id in self.running_map_tasks[job_id]:
				
				# del the killed task from the running tasks list
				del (self.running_map_tasks[job_id])[task_id]
			
				# if the task is the only one for this job_id at this NT then delete the Job_id entry
				if len(self.running_map_tasks[job_id]) == 0:
					del self.running_map_tasks[job_id]
		
		# for reduces
		else:
			if job_id in self.running_reduce_tasks and task_id in self.running_reduce_tasks[job_id]:
				
				# del the killed task from the running tasks list
				del (self.running_reduce_tasks[job_id])[task_id]
			
				# if the task is the only one for this job_id at this NT then delete the Job_id entry
				if len(self.running_reduce_tasks[job_id]) == 0:
					del self.running_reduce_tasks[job_id]
		
		
		# TODO: tasks_last_ping also need to bifurcated to maps and reduces		
		if job_id in self.tasks_last_ping and task_id in self.tasks_last_ping[job_id]:
			#del the killed task from the last ping of tasks dict
			del (self.tasks_last_ping[job_id])[task_id]
		
		# decrement the current task capacity on this node_tracker
		self.task_vacancy += 1
		
		raise Exception('KillTask has been called %s%s%s', str(job_id), map_or_reduce, str(task_id))
	
	
	def startNewMapTask(self, newmaptaskaction):
		"""this func is called if the JobTracker responds to heartbeat() 
		directive to launch a new task; the task is added to the running
		tasks dict and started by the task runner as a seperate thread
		"""
		
		# a new mapTIPInfo object is created
		maptip_info = MapTIPInfo(newmaptaskaction.task_id, 
								 newmaptaskaction.job_id,
								 newmaptaskaction.input_data,
                                 newmaptaskaction.struct_id)
		
		# logging the map task initated
		self.__log.info("A Map task %d initiated with Job Id %d", maptip_info.task_id, maptip_info.job_id)
		
		# if the job is new, localize its variables
		if(maptip_info.job_id not in self.jobs):
			self.localizeJob(maptip_info.job_id)
			self.jobs.append(maptip_info.job_id)
			
		# a new MapTIP object is created to be added to the running tasks dict								 
		maptip = MapTIP(self, maptip_info,newmaptaskaction.struct_id)
		self.threads.append(maptip)
		
		# add the MapTIP to the running tasks dict
		if(maptip_info.job_id in self.running_map_tasks):
			(self.running_map_tasks[maptip_info.job_id])[maptip_info.task_id] = [maptip, RLock()]
		else:
			self.running_map_tasks[maptip_info.job_id] = {maptip_info.task_id: [maptip,RLock()]}
			self.completed_maps[int(maptip_info.job_id)] = []
		
		ping_id = maptip_info.returnTaskID()
		
		# add the MapTIP to the tasks_last_ping dict
		if(maptip_info.job_id in self.tasks_last_ping):
			(self.tasks_last_ping [maptip_info.job_id])[ping_id] = [maptip_info.map_or_reduce, time.time(), RLock()]
		else:
			self.tasks_last_ping[maptip_info.job_id] = {ping_id: [maptip_info.map_or_reduce, time.time(), RLock()]}
						
		# the launch task func called of the TaskInProgress object
		(self.running_map_tasks[maptip_info.job_id])[maptip_info.task_id][0].start()
		
		# increment the current task capacity on this node_tracker
		self.task_vacancy -= 1
	
	
	def startNewReduceTask(self, newreducetaskaction):
		""" This function will add all reduce task for a job from awaiting tasks dict to 
		the running tasks dictionary as the shuffle phase for a job complete
		"""
		
		# a new reduceTIPInfo object is created
		reducetip_info = ReduceTIPInfo(newreducetaskaction.job_id,
									   newreducetaskaction.task_id,
									   newreducetaskaction.input_data,
                                       newreducetaskaction.reduce_level,
                                       newreducetaskaction.struct_id)
		
		# logging the reduce task initated
		self.__log.info("A Reduce task %d initiated with Job Id %d", reducetip_info.task_id, reducetip_info.job_id)
		
		# if the job is new, localize its variables
		if(reducetip_info.job_id not in self.jobs):
			self.localizeJob(reducetip_info.job_id)
			self.jobs.append(reducetip_info.job_id)
			
		# a new ReduceTIP object is created to be added to the running tasks dict
		reducetip = ReduceTIP(self, reducetip_info, newreducetaskaction.reduce_level, newreducetaskaction.struct_id)
		self.threads.append(reducetip)
		
		# add the ReduceTIP to the running tasks dict
		if(reducetip_info.job_id in self.running_reduce_tasks):
			(self.running_reduce_tasks[reducetip_info.job_id])[reducetip_info.task_id] = [reducetip, RLock()]
		else:
			self.running_reduce_tasks[reducetip_info.job_id] = {reducetip_info.task_id: [reducetip, RLock()]}
		
		ping_id = reducetip_info.returnTaskID()
		
		# add the ReduceTIP to the tasks_last_ping dict
		if(reducetip_info.job_id in self.tasks_last_ping):
			(self.tasks_last_ping [reducetip_info.job_id])[ping_id] = [reducetip_info.map_or_reduce,time.time(),RLock()]
		else:
			self.tasks_last_ping[reducetip_info.job_id] = {ping_id: [reducetip_info.map_or_reduce,time.time(),RLock()]}
		
		# the launch task func called of the TaskInProgress object
		(self.running_reduce_tasks[reducetip_info.job_id])[reducetip_info.task_id][0].start()
		
		# decrement the current task capacity on this node_tracker
		self.task_vacancy -= 1
	
	
	def taskComplete(self, task_info):
		"""func is called from within a task running in a separate process; when a 
		task completes; this task is removed from the running tasks dict and 
		added to the completed tasks dict
		"""
		
		with self.__lock:
			job_id = task_info.job_id
			task_id = task_info.task_id
			m_or_r = task_info.map_or_reduce
			
			self.__log.debug('NT got a notification from an %s task %s of job-id %s for completion', str(m_or_r), str(task_id), str(job_id))
			
			# generate ping id
			ping_id = task_info.returnTaskID()
			
			if job_id in self.tasks_last_ping and ping_id in self.tasks_last_ping[job_id]:
				# remove task off the health ping list
				del (self.tasks_last_ping[job_id])[ping_id]
			
			#if task is a Map task
			if m_or_r == MAP:
				# add job_id to completed tasks dict if already not present
				if job_id not in self.complete_map_tasks:
					self.complete_map_tasks[job_id] = {}
				
				if job_id in self.running_map_tasks:
					if task_id in self.running_map_tasks[job_id]:
						# move task from running to completed tasks
						(self.complete_map_tasks[job_id])[task_id] = (self.running_map_tasks[job_id]).pop(task_id)

				self.__log.info("Completing Map Task %s of job-id %s", str(task_id), str(job_id))
								
				# make a completedTaskInfo object
				completedTask = CompletedTaskInfo(task_id, job_id, m_or_r, ((self.complete_map_tasks[job_id])[task_id])[0].struct_id)
			
			# for a reduce task	
			else:
				if job_id not in self.complete_reduce_tasks:
					self.complete_reduce_tasks[job_id] = {}
					
				if job_id in self.running_reduce_tasks:
					if task_id in self.running_reduce_tasks[job_id]:
						# move task from running to completed tasks
						(self.complete_reduce_tasks[job_id])[task_id] = (self.running_reduce_tasks[job_id]).pop(task_id)
						
				self.__log.info("Completing Reduce Task %s of job-id %s", str(task_id), str(job_id))
				
				# make a completedTaskInfo object
				completedTask = CompletedTaskInfo(task_id, job_id, m_or_r, ((self.complete_reduce_tasks[job_id])[task_id])[0].struct_id,((self.complete_reduce_tasks[job_id])[task_id])[0].reduce_level)
			
			self.info_objects_list.append(completedTask)
			
			# increment the current task capacity on this node_tracker
			self.task_vacancy += 1

	
	def taskFailed(self, job_id, task_id, map_or_reduce):
		"""func is called from within a task running in a separate process; when a 
		task fails; this task is removed from the running tasks dict and 
		added to the failed tasks dict
		"""
		
		self.__log.error("Failed %s task %s request to NT for Job Id %s", map_or_reduce, str(task_id), str(job_id))
		
		failedTask = FailedTaskInfo(task_id, job_id, map_or_reduce)
		
		self.info_objects_list.append(failedTask)
		
		# increment the current task capacity on this node_tracker
		self.task_vacancy += 1
			
		
	def healthPingFromTask(self, job_id, task_id, m_or_r):
		"""func is called from within a task running in a separate process; to update 
		its health status with the NodeTracker
		"""
		
		with self.__lock:
			# refresh the last ping time for a task if this task_id exists and has not been
			#  killed
			ping_tid = str(task_id) + '_' + m_or_r
			if job_id in self.tasks_last_ping and ping_tid in self.tasks_last_ping[job_id]:
				(self.tasks_last_ping[job_id])[ping_tid] = [time.time(),RLock()]
				
			self.__log.debug("%s Task-id %s with job_id %s just pinged the NT", m_or_r, str(task_id), str(job_id)) 
	
	
	def jobComplete(self, job_id):
		"""This function does all the cleanup work when a completed job
		is reported to the nodetracker
		"""
		
		self.__log.info("/////////// Job %s reported complete :) ///////////", str(job_id))
					
		self.__log.warning("Stopping (and dumping values) of statster prematurely")
		# dump the statster
		self.statster.stop_statster()
		
		# remove from jobs list
		if job_id in self.jobs: 
			self.jobs.remove(job_id)
	

	def localizeJob(self, job_id):
		"""Run for copying all essential files from the DFS to the local
		directory whenever this NT gets a job whose task it hasn't done 
		before. It also creates the relevant directories in the local FS
		"""
		
		try:
			self.__log.info("Initiating / copying config files for Job id %d", job_id)
			
			# get consts
			input_path = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_input_temp_dir, job_id)
			output_path = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_output_temp_dir, job_id)
			compute_path = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_compute_temp_dir, job_id)
			
			# create directories
			if not os.path.isdir(input_path):
				os.makedirs(input_path)
			if not os.path.isdir(output_path):
				os.makedirs(output_path)
			if not os.path.isdir(compute_path):
				os.makedirs(compute_path)
			
			# copy job conf
			job_conf_local_dir = os.path.dirname(ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_input_temp_dir, job_id))
			job_conf_dfs_filepath = ma.const.JobsXmlData.get_dfs_filepath_str_data(ma.const.xml_dfs_path_job_conf, job_id)
			
			if self.hdfs.check_path(job_conf_dfs_filepath) == 1:
				# delete if already exists
				local_jobconf_filepath = job_conf_local_dir + os.sep + os.path.basename(job_conf_dfs_filepath)
				if os.path.isfile(local_jobconf_filepath):
					os.remove(local_jobconf_filepath)
				
				self.hdfs.copy_file_to_local(job_conf_dfs_filepath, job_conf_local_dir, job_id, auto_retry=True)
			else:
				self.__log.error('Job Conf for job_id %d does not exist', job_id)
			
			# reinitialize the jobs XML data to remove all previous values
			ma.const.JobsXmlData.reinitialize(job_id)
			
		except Exception as err_msg:
			self.__log.error('Error while localizing new job: %s', err_msg) 


	def returnNoOfMaps(self, job_id=None):
		"""This function returns the number of running map tasks, and the 
		number of completed mao tasks on this nodetracker. If the job_id given
		is None, it will give a total count, otherwise it will give job
		specific stats
		"""
		
		no_of_running_maps = 0
		no_of_complete_maps = 0
		
		if job_id == None:
			# if need global maps
			for key in self.running_map_tasks:
				no_of_running_maps += len(self.running_map_tasks[key])
		
			for key in self.complete_map_tasks:
				no_of_complete_maps += len(self.complete_map_tasks[key])
		else:
			# if need job specific maps
			temp_list = self.running_map_tasks.get(job_id, [])
			no_of_running_maps += len(temp_list)
			
			temp_list = self.complete_map_tasks.get(job_id, [])
			no_of_complete_maps += len(temp_list)
			
		return no_of_running_maps, no_of_complete_maps


	def returnNoOfReduces(self, job_id=None):
		"""This function returns the number of running reduce tasks, and the 
		number of completed reduce tasks on this nodetracker. If the job_id
		given is None, it will give a total count, otherwise it will give job
		specific stats
		"""
		
		no_of_running_reduces = 0
		no_of_complete_reduces = 0
		
		if job_id == None:
			# if need global reduces
			for key in self.running_reduce_tasks:
				no_of_running_reduces += len(self.running_reduce_tasks[key])
				
			for key in self.complete_reduce_tasks:
				no_of_complete_reduces += len(self.complete_reduce_tasks[key])
		else:
			# if need job specific reduces
			temp_list = self.running_reduce_tasks.get(job_id, [])
			no_of_running_reduces += len(temp_list)
			
			temp_list = self.complete_reduce_tasks.get(job_id, [])
			no_of_complete_reduces += len(temp_list)

		return no_of_running_reduces, no_of_complete_reduces



def main(argv=None):
	print('--- Starting up MR+ NodeTracker compute engine ---')
	
	nodetracker = NodeTracker()
	
	nodetracker.start()
	nodetracker.threads.append(nodetracker)
	for th in nodetracker.threads:
		th.join()	  
		
	#nodetracker.threads.append(nodetracker)
	
		
if __name__ == "__main__":
	sys.exit(main())

